Eureka分为Client端和Server端,Client端向Server端注册自己的服务信息,并且拉取所有服务的注册信息,Server端作为注册中心,负责接收Client端的注册信息,维护所有服务的注册信息,Server端也可以开启集群模式,相互之间同步服务的注册信息。
与缓存相关的三个变量
1.registry
1 | private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry |
AbstractInstanceRegistry的成员变量,相当于服务端的注册表,维护所有服务的注册信息,第一层Map的key是服务的应用名称,value也是一个map,其中key是实例id,value是实例的详细信息(Lease
2.readWriteCacheMap
1 | private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap; |
ResponseCacheImpl的成员变量,使用的是guava的LoadingCache构建的缓存,其中key为com.netflix.eureka.registry包下的Key对象,value是ResponseCacheImpl的内部类Value对象,从名称上可以看出它是一个读写缓存。
3.readOnlyCacheMap
1 | private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap(); |
ResponseCacheImpl的成员变量,同样是一个ConcurrentMap,从名称上可以看出它是一个只读的缓存对象。
所以Eureka Server使用了一个读写分离的两级缓存机制,registry负责维护所有服务的注册信息,当register数据有变化,将会更新readWriteCacheMap的内容,后台开启定时任务,默认30s从readWriteCacheMap同步数据到readOnlyCacheMap,Eureka Client拉取服务的注册信息时,从readOnlyCacheMap读取数据,并没有直接从readWriteCacheMap中获取数据:
Client从Server端拉取数据流程:
(1)Client端DiscoveryClient的构造函数中,会初始化定时任务;
(2)缓存刷新任务,定时发送请求到Server端,拉取服务的注册数据,Server端收到请求,首先会根据请求信息构建key,根据key从缓存中获取数据;
(3)Server端根据key获取缓存数据时,会判断是否开启了使用readOnly缓存:
如果开启:先从readOnlyCacheMap获取,如果从readOnlyCacheMap未获取到数据,再从readWriteCacheMap中查找;
如果未开启:直接从readWriteCacheMap中获取;
(4)从readWriteCacheMap获取数据也有两种结果:
第一:获取到数据,如果开启了readOnly缓存,会将结果设置到readOnlyCacheMap中,否则直接返回结果即可;
第二:未获取到数据,由于readWriteCacheMap使用的是guava构建的缓存,从readWriteCacheMap根据key获取数据,假如key不存在的活,就会触发load方法,在这个方法中会调用generatePayload方法从registry中获取数据构并建缓存数据返回;
(5)Server端将结果设置到response返回到Client端;
源码
Eureka Server
AbstractInstanceRegistry
以register服务注册方法看一下缓存更新的流程,在register方法中,服务进行注册的时候,会根据服务的应用名称appName,调用invalidateCache方法清除之前存在的缓存。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47public abstract class AbstractInstanceRegistry implements InstanceRegistry{
// server端的注册表,key是服务的应用名称,value又是一个map, key是实例id,value是实例的详细信息(Lease<InstanceInfo>)
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
...
// 注册服务
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
this.read.lock();
// 根据应用名称从注册表中获取实例
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
EurekaMonitors.REGISTER.increment(isReplication);
// 如果获取的实例为空
if(gMap == null) {
// 创建一个hashmap
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
// 加入注册表
gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
if(gMap == null) {
gMap = gNewMap;
}
}
// 根据实例ID获取,实例的详细信息
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
...
registrant.setActionType(ActionType.ADDED);
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清除缓存
this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), Boolean.valueOf(isReplication)});
} finally {
this.read.unlock();
}
}
// 清除缓存
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
this.responseCache.invalidate(appName, vipAddress, secureVipAddress);
}
}
ResponseCacheImpl
(1)ResponseCacheImpl中包含了readOnlyCacheMap只读缓存对象和readWriteCacheMap读写缓存对象,其中readWriteCacheMap使用的是guava的缓存机制,如果根据key从readWriteCacheMap获取数据时没有这个key,将会调用load方法,在load方法中是调用generatePayload构建Value对象的,在generatePayload是从registry获取数据的,也就是说当readWriteCacheMap不存在某个key是会从registry获取数据。
(2)在构造函数中,初始化了readWriteCacheMap对象,并判断是否使用readOnlyCacheMap缓存,如果开启,设置定时任务getCacheUpdateTask,定时从readWriteCacheMap同步数据到readOnlyCacheMap,同步方式是遍历readOnlyCacheMap,判断value是否与readWriteCacheMap的数据一致,如果不一致,以readWriteCacheMap的数据为准,更新readOnlyCacheMap的数据。
(3)invalidate方法用来清除readWriteCacheMap中的缓存.
1 | public class ResponseCacheImpl implements ResponseCache { |
ApplicationsResource
ApplicationsResource的getContainers接收Client端的拉取请求,在getContainers方法中,首先会根据请求信息构建缓存Key,根据key从缓存中获取数据,并设置到response中返回给Client端。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class ApplicationsResource {
public Response getContainers(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("Accept-Encoding") String acceptEncoding, @HeaderParam("X-Eureka-Accept") String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if(!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
if(!this.registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
} else {
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = KeyType.JSON;
String returnMediaType = "application/json";
if(acceptHeader == null || !acceptHeader.contains("json")) {
keyType = KeyType.XML;
returnMediaType = "application/xml";
}
// 构建缓存key
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
Response response;
// 如果acceptEncoding是gizp
if(acceptEncoding != null && acceptEncoding.contains("gzip")) {
response = Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding", "gzip").header("Content-Type", returnMediaType).build();
} else {
// 从缓存中根据key获取数据设置到response中
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
return response;
}
}
}
回到ResponseCacheImpl,get方法用来根据key从缓存中获取数据,最终调用的是getValue方法,在getValue方法中可以看到,如果开启了只读缓存,先从readOnlyCacheMap中获取数据,如果未获取到再从readWriteCacheMap中获取,如果未开启只读缓存,直接从readWriteCacheMap中获取,如果readWriteCacheMap不存在某个key,可以回看ResponseCacheImpl构造函数初始化readWriteCacheMap时有一个load方法,不存在某个key时触发该方法,实际上是去registry中取数据并构建缓存数据:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class ResponseCacheImpl implements ResponseCache {
...
// 根据keky获取缓存数据
public String get(Key key) {
return this.get(key, this.shouldUseReadOnlyResponseCache);
}
String get(Key key, boolean useReadOnlyCache) {
// 根据key从缓存map中获取value
ResponseCacheImpl.Value payload = this.getValue(key, useReadOnlyCache);
return payload != null && !payload.getPayload().equals("")?payload.getPayload():null;
}
ResponseCacheImpl.Value getValue(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;
try {
// 如果使用ReadOnlyCache
if(useReadOnlyCache) {
// 从ReadOnlyCache获取缓存数据
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
// 如果获取不为空
if(currentPayload != null) {
payload = currentPayload;
} else {
// 如果获取为空,从readWriteCacheMap中获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
// 将获取的数据加入readOnlyCacheMap中
this.readOnlyCacheMap.put(key, payload);
}
} else {
// 如果不使用readOnlyCacheMap,直接从readWriteCacheMap获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
} catch (Throwable var5) {
logger.error("Cannot get value for key : {}", key, var5);
}
return payload;
}
}
Eureka Client
DiscoveryClient
(1)在DiscoveryClient的构造方法中,会判断是否需要拉取数据,并且初始化定时任务;
(2)初始化定时任务方法中,会添加缓存更新的定时任务,是在CacheRefreshThread的run方法中实现的,在run方法中调用refreshRegistry刷新缓存,在refreshRegistry方法中又调用了fetchRegistry,所以最终使用的fetchRegistry方法从服务端拉取数据;
(3)fetchRegistry方法中会判断是全量还是增量从Server端拉取数据;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136public class DiscoveryClient implements EurekaClient {
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
...
// 如果需要拉取数据
if(this.clientConfig.shouldFetchRegistry() && !this.fetchRegistry(false)) {
// 从备用服务拉取数据
this.fetchRegistryFromBackup();
}
...
// 初始化定时任务
this.initScheduledTasks();
...
}
// 初始化定时任务
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
// 是否需要拉取数据
if(this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 注册刷新缓存的定时任务
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
if(this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: {}", Integer.valueOf(renewalIntervalInSecs));
// 注册发送心跳的定时任务
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread(null)), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
// 构建InstanceInfoReplicator,其run方法中会调用discoveryClient.register()发送HTTP请求进行服务注册
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if(InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if(this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
// 缓存刷新任务
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
// 调用refreshRegistry刷新缓存
DiscoveryClient.this.refreshRegistry();
}
}
void refreshRegistry() {
try {
...
// 调用fetchRegistry从服务端拉取数据
boolean success = this.fetchRegistry(remoteRegionsModified);
if(success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
...
} catch (Throwable var9) {
logger.error("Cannot fetch registry from server", var9);
}
}
// 拉取数据
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();
label122: {
boolean var4;
try {
Applications applications = this.getApplications();
if(!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion().longValue() != -1L) {
// 增量拉取数据
this.getAndUpdateDelta(applications);
} else {
logger.info("Disable delta property : {}", Boolean.valueOf(this.clientConfig.shouldDisableDelta()));
logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", Boolean.valueOf(forceFullRegistryFetch));
logger.info("Application is null : {}", Boolean.valueOf(applications == null));
logger.info("Registered Applications size is zero : {}", Boolean.valueOf(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", Boolean.valueOf(applications.getVersion().longValue() == -1L));
// 全量拉取数据
this.getAndStoreFullRegistry();
}
applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}", new Object[]{this.appPathIdentifier, var8.getMessage(), var8});
var4 = false;
} finally {
if(tracer != null) {
tracer.stop();
}
}
return var4;
}
// 缓存刷新事件
this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
}
参考:
[xmz_java:Eureka 缓存结构以及服务感知优化](https://www.cnblogs.com/xmzJava/p/11359636.html)
[宜信技术:详解Eureka 缓存机制](https://www.cnblogs.com/yixinjishu/p/10871243.html)
Spring Cloud版本:Finchley.RELEASE